package com.aotain.nyx.port;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;

import com.aotain.nyx.ipstat.IPStatTuple;
import com.aotain.nyx.statis.AbnStatisTuple;

public class PortAbnStatisMap implements MapFunction<AbnStatisTuple, Tuple2<String,IPStatTuple>>{

	/**
	 *
	 */
	    
	private static final long serialVersionUID = -6051947535515710869L;

	@Override
	public Tuple2<String, IPStatTuple> map(AbnStatisTuple value)
			throws Exception {
		Tuple2<String,IPStatTuple> ret = new Tuple2<String,IPStatTuple>();
		if(value != null) {
			String dip = value.getDestIP();
			IPStatTuple stat = new IPStatTuple();
			stat.setSessionNum(value.getSessionNum());
			stat.setDestIP(value.getDestIP());
			stat.setDestPort(value.getDestPort());

			ret.setFields(dip, stat);
		}
		return ret;
		    
	}

}

    